Amazon LinuxからTreasure Dataを使ってみる
Fluentdが良さげなのでTreasure Dataも使ってみる
Treasure Dataは、Big Data as a Serviceを提供しています。あらゆるログを転送する仕組みがFluentdでしたが、Treasure Dataは生成され転送されるログを溜め込んで価値ある情報として加工して提供しましょうと感じです。ある程度までは無料で使えるようですので早速使ってみましょう。
アカウントを作成する
クラウド型のサービスを利用しますのでアカウントを取得しましょう。
セットアップする
いつもの流れということで、まずはAmazon Linuxを立ち上げてください。そして以下のコマンドへと続きます。
コマンド1発でインストールが終わりそうな感じですが、Amazon Linuxに足りないものを入れておきましょう。
$ sudo yum update -y $ sudo yum install rubygems ruby-devel gcc make -y $ sudo gem install td iconv json
さくっと終わりましたね!
コマンド
どんなコマンドが使えるかヘルプで見てみます。
$ td usage: td [options] COMMAND [args] options: -c, --config PATH path to config file (~/.td/td.conf) -k, --apikey KEY use this API key instead of reading the config file -v, --verbose verbose mode -h, --help show help Basic commands: db # create/delete/list databases table # create/delete/list/import/export/tail tables query # issue a query job # show/kill/list jobs bulk_import # manage bulk import sessions result # create/delete/list result URLs Additional commands: sched # create/delete/list schedules that run a query periodically schema # create/delete/modify schemas of tables status # show scheds, jobs, tables and results apikey # show/set API key server # show status of the Treasure Data server help # show help messages Type 'td help COMMAND' for more information on a specific command.
さらに詳細なのはこちら
$ td help:all db:list # Show list of tables in a database db:show <db> # Describe information of a database db:create <db> # Create a database db:delete <db> # Delete a database table:list [db] # Show list of tables table:show <db> <table> # Describe information of a table table:create <db> <table> # Create a table table:delete <db> <table> # Delete a table table:import <db> <table> <files...> # Parse and import files to a table table:export <db> <table> # Dump logs in a table to the specified storage table:swap <db> <table1> <table2> # Swap names of two tables table:tail <db> <table> # Get recently imported logs table:partial_delete <db> <table> # Delete logs from the table within the specified time range bulk_import:list # List bulk import sessions bulk_import:show <name> # Show list of uploaded parts bulk_import:create <name> <db> <table> # Create a new bulk import session to the the table bulk_import:prepare_parts <files...> # Convert files into part file format bulk_import:prepare_parts2 <files...> # Convert files into part file format bulk_import:upload_part <name> <id> <path.msgpack.gz> # Upload or re-upload a file into a bulk import session bulk_import:upload_parts <name> <files...> # Upload or re-upload files into a bulk import session bulk_import:delete_part <name> <id> # Delete a uploaded file from a bulk import session bulk_import:delete_parts <name> <ids...> # Delete uploaded files from a bulk import session bulk_import:perform <name> # Start to validate and convert uploaded files bulk_import:error_records <name> # Show records which did not pass validations bulk_import:commit <name> # Start to commit a performed bulk import session bulk_import:delete <name> # Delete a bulk import session bulk_import:freeze <name> # Reject succeeding uploadings to a bulk import session bulk_import:unfreeze <name> # Unfreeze a frozen bulk import session result:list # Show list of result URLs result:show <name> # Describe information of a result URL result:create <name> <URL> # Create a result URL result:delete <name> # Delete a result URL status # Show schedules, jobs, tables and results schema:show <db> <table> # Show schema of a table schema:set <db> <table> [columns...] # Set new schema on a table schema:add <db> <table> <columns...> # Add new columns to a table schema:remove <db> <table> <columns...> # Remove columns from a table sched:list # Show list of schedules sched:create <name> <cron> <sql> # Create a schedule sched:delete <name> # Delete a schedule sched:update <name> # Modify a schedule sched:history <name> [max] # Show history of scheduled queries sched:run <name> <time> # Run scheduled queries for the specified time query <sql> # Issue a query job:show <job_id> # Show status and result of a job job:list [max] # Show list of jobs job:kill <job_id> # Kill or cancel a job password:change # Change password apikey:show # Show Treasure Data API key apikey:set <apikey> # Set Treasure Data API key user:list # Show list of users user:show <name> # Show an user user:create <name> # Create an user user:delete <name> # Delete an user user:apikey:list <name> # Show API keys user:password:change <name> # Change password role:list # Show list of roles role:show <name> # Show a role role:create <name> # Create a role role:delete <name> # Delete a role role:grant <name> <user> # Grant role to an user role:revoke <name> <user> # Revoke role from an user org:list # Show list of organizations org:show <name> # Show an organizations org:create <name> # Create an organizations org:delete <name> # Delete an organizations acl:list # Show list of access controls acl:grant <subject> <action> <scope> # Grant an access control acl:revoke <subject> <action> <scope> # Revoke an access control server:status # Show status of the Treasure Data server sample:apache <path.json> # Create a sample log file Type 'td help COMMAND' for more information on a specific command.
データベースの作成
続いて接続確認とデータベースの作成です。次の操作を教えてくれるのは心遣いですね。
$ td account -f Enter your Treasure Data credentials. Email: [email protected] Password (typing will be hidden): Authenticated successfully. Use 'td db:create <db_name>' to create a database.
データベースの作成コマンドです。
$ td db:create testdb Database 'testdb' is created. Use 'td table:create testdb <table_name>' to create a table.
テーブルの作成
ナビゲーションに沿ってテーブルを作成します。
$ td table:create testdb www_access Table 'testdb.www_access' is created.
サンプルデータの生成と挿入
テーブルを作成したのでFluentdからデータを転送しても良いのですが取り急ぎ動作確認するためにサンプルデータを入れています。
$ td sample:apache apache.json Create apache.json with 5000 records whose time is from Sun Jan 20 23:50:16 +0000 2013 to Mon Jan 21 17:42:46 +0000 2013. Use 'td table:import <db> <table> --json apache.json' to import this file. $ td table:import testdb www_access --json apache.json importing apache.json... uploading 117247 bytes... imported 5000 entries from apache.json. done.
テーブル一覧の表示
作成したテーブルを見てみましょう。
$ td table:list +----------+------------+------+-------+--------+--------------------------------+--------+ | Database | Table | Type | Count | Size | Last import | Schema | +----------+------------+------+-------+--------+--------------------------------+--------+ | testdb | www_access | log | 5000 | 0.0 GB | Mon Jan 21 17:43:03 +0000 2013 | | +----------+------------+------+-------+--------+--------------------------------+--------+ 1 row in set
クエリーを発行する
今回のサンプルはレコード数が少ないですが、実際のビジネスでは数億とかいう規模のデータをドカーンと入れて、Hadoop使って解析してくれるわけですね。んで、この問合せ方法としてSQLライクに表記できるHiveを使うことができるようです。最新の技術に対して使いなれた構文でアクセスできるのは嬉しいですね。
件数の取得
まずはレコードの件数を出してみましょう。
$ td query -w -d testdb "SELECT count(*) FROM www_access" Job 1568184 is queued. Use 'td job:show 1568184' to show the status. queued... started at 2013-01-22T16:51:19Z Hive history file=/tmp/1461/hive_job_log__91678880.txt Total MapReduce jobs = 1 Launching Job 1 out of 1 Number of reduce tasks determined at compile time: 1 In order to change the average load for a reducer (in bytes): set hive.exec.reducers.bytes.per.reducer=<number> In order to limit the maximum number of reducers: set hive.exec.reducers.max=<number> In order to set a constant number of reducers: set mapred.reduce.tasks=<number> Starting Job = job_201301150013_26879, Tracking URL = http://ip-10-8-189-47.ec2.internal:50030/jobdetails.jsp?jobid=job_201301150013_26879 Kill Command = /usr/lib/hadoop/bin/hadoop job -Dmapred.job.tracker=10.8.189.47:8021 -kill job_201301150013_26879 2013-01-22 16:51:32,501 Stage-1 map = 0%, reduce = 0% 2013-01-22 16:51:36,561 Stage-1 map = 100%, reduce = 0% finished at 2013-01-22T16:51:48Z 2013-01-22 16:51:45,650 Stage-1 map = 100%, reduce = 100% Ended Job = job_201301150013_26879 OK MapReduce time taken: 22.455 seconds Time taken: 22.598 seconds Status : success Result : +------+ | _c0 | +------+ | 5000 | +------+ 1 row in set
条件を入れて検索したいところですが、どんなカラムがあるか分からないので1件だけデータを取得してみます。
$ td query -w -d testdb "SELECT * FROM www_access limit 1" Job 1568185 is queued. Use 'td job:show 1568185' to show the status. queued... started at 2013-01-22T16:53:54Z Hive history file=/tmp/1461/hive_job_log__1446690009.txt finished at 2013-01-22T16:54:03Z OK MapReduce time taken: 0.189 seconds Time taken: 1.841 seconds Status : success Result : +-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+ | v | time | +-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+ | {"path":"/category/electronics","host":"176.72.215.97","method":"GET","user":"-","agent":"Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; YTB730; GTB7.2; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; .NET4.0C; .NET4.0E; Media Center PC 6.0)","referer":"/item/books/919","code":"200","time":"1358787594","size":"76"} | 1358787594 | +-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+ 1 row in set
条件を付けて検索
カラム名が分かったので条件を付けてみたいと思います。agentにMacintoshが含まれるレコードの件数です。全文検索っすね!
$ td query -w -d testdb "SELECT count(*) FROM www_access where v['agent'] like '%Macintosh%'" Job 1568190 is queued. Use 'td job:show 1568190' to show the status. queued... started at 2013-01-22T16:58:02Z Hive history file=/tmp/1461/hive_job_log__1636264800.txt Total MapReduce jobs = 1 Launching Job 1 out of 1 Number of reduce tasks determined at compile time: 1 In order to change the average load for a reducer (in bytes): set hive.exec.reducers.bytes.per.reducer=<number> In order to limit the maximum number of reducers: set hive.exec.reducers.max=<number> In order to set a constant number of reducers: set mapred.reduce.tasks=<number> Starting Job = job_201301150013_26886, Tracking URL = http://ip-10-8-189-47.ec2.internal:50030/jobdetails.jsp?jobid=job_201301150013_26886 Kill Command = /usr/lib/hadoop/bin/hadoop job -Dmapred.job.tracker=10.8.189.47:8021 -kill job_201301150013_26886 2013-01-22 16:58:15,145 Stage-1 map = 0%, reduce = 0% 2013-01-22 16:58:20,202 Stage-1 map = 100%, reduce = 0% 2013-01-22 16:58:29,299 Stage-1 map = 100%, reduce = 100% Ended Job = job_201301150013_26886 OK MapReduce time taken: 22.395 seconds Time taken: 22.539 seconds finished at 2013-01-22T16:58:32Z Status : success Result : +-----+ | _c0 | +-----+ | 445 | +-----+ 1 row in set
GROUP BYとORDER BYとLIMIT
SQLといえばグループ化と順序付けでしょう。ということで使ってみます。agentでグループ化してTOP3を表示しています。
$ td query -w -d testdb "SELECT v['agent'] as agent , count(*) as count FROM www_access group by v['agent'] order by count desc limit 3" Job 1568253 is queued. Use 'td job:show 1568253' to show the status. queued... started at 2013-01-22T17:01:12Z Hive history file=/tmp/1461/hive_job_log__1562578541.txt Total MapReduce jobs = 2 Launching Job 1 out of 2 Number of reduce tasks not specified. Defaulting to jobconf value of: 12 In order to change the average load for a reducer (in bytes): set hive.exec.reducers.bytes.per.reducer=<number> In order to limit the maximum number of reducers: set hive.exec.reducers.max=<number> In order to set a constant number of reducers: set mapred.reduce.tasks=<number> Starting Job = job_201301150013_26899, Tracking URL = http://ip-10-8-189-47.ec2.internal:50030/jobdetails.jsp?jobid=job_201301150013_26899 Kill Command = /usr/lib/hadoop/bin/hadoop job -Dmapred.job.tracker=10.8.189.47:8021 -kill job_201301150013_26899 2013-01-22 17:01:29,672 Stage-1 map = 0%, reduce = 0% 2013-01-22 17:01:35,742 Stage-1 map = 100%, reduce = 0% 2013-01-22 17:01:45,845 Stage-1 map = 100%, reduce = 8% 2013-01-22 17:01:53,920 Stage-1 map = 100%, reduce = 17% 2013-01-22 17:02:02,025 Stage-1 map = 100%, reduce = 25% 2013-01-22 17:02:10,120 Stage-1 map = 100%, reduce = 33% 2013-01-22 17:02:18,195 Stage-1 map = 100%, reduce = 42% 2013-01-22 17:02:25,260 Stage-1 map = 100%, reduce = 50% 2013-01-22 17:02:33,375 Stage-1 map = 100%, reduce = 58% 2013-01-22 17:02:41,450 Stage-1 map = 100%, reduce = 67% 2013-01-22 17:02:49,526 Stage-1 map = 100%, reduce = 75% 2013-01-22 17:02:57,601 Stage-1 map = 100%, reduce = 83% 2013-01-22 17:03:04,667 Stage-1 map = 100%, reduce = 92% 2013-01-22 17:03:12,742 Stage-1 map = 100%, reduce = 100% Ended Job = job_201301150013_26899 Launching Job 2 out of 2 Number of reduce tasks determined at compile time: 1 In order to change the average load for a reducer (in bytes): set hive.exec.reducers.bytes.per.reducer=<number> In order to limit the maximum number of reducers: set hive.exec.reducers.max=<number> In order to set a constant number of reducers: set mapred.reduce.tasks=<number> Starting Job = job_201301150013_26904, Tracking URL = http://ip-10-8-189-47.ec2.internal:50030/jobdetails.jsp?jobid=job_201301150013_26904 Kill Command = /usr/lib/hadoop/bin/hadoop job -Dmapred.job.tracker=10.8.189.47:8021 -kill job_201301150013_26904 2013-01-22 17:03:20,258 Stage-2 map = 0%, reduce = 0% 2013-01-22 17:03:23,285 Stage-2 map = 100%, reduce = 0% 2013-01-22 17:03:32,429 Stage-2 map = 100%, reduce = 100% Ended Job = job_201301150013_26904 finished at 2013-01-22T17:03:35Z OK MapReduce time taken: 133.066 seconds Time taken: 133.249 seconds Status : success Result : +-------------------------------------------------------------------------------------+-------+ | agent | count | +-------------------------------------------------------------------------------------+-------+ | Mozilla/5.0 (Windows NT 6.0; rv:10.0.1) Gecko/20100101 Firefox/10.0.1 | 630 | | Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; WOW64; Trident/5.0) | 497 | | Mozilla/5.0 (Macintosh; Intel Mac OS X 10.6; rv:9.0.1) Gecko/20100101 Firefox/9.0.1 | 445 | +-------------------------------------------------------------------------------------+-------+ 3 rows in set
FluentdからTreasure Dataにデータを送る
基本的な使い方が分かりましたので、Fluentdからデータを送り込んでみましょう!Fluentdのセットアップは前回やりましたので割愛しますね。FluentdからTreasure Dataにデータを送るためにはAPIKEYが必要になりますので表示してみます。
$ td apikey XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
次にプラグインをインストールします。
$ sudo yum install make gcc -y $ sudo /usr/lib64/fluent/ruby/bin/fluent-gem install fluent-plugin-td
プラグインに会わせてtd-agentの設定ファイルを記述したいと思います。
$ cd /etc/td-agent $ sudo vi td-agent.conf
発生したApacheのログをS3とMongoとTreasure Dataに転送しています。
<source> type forward port 22222 </source> <source> type tail path /var/log/httpd/access_log format apache tag td.testdb.www_access </source> <match *.**> type copy <store> type tdlog apikey XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX auto_create_table buffer_type file buffer_path /var/log/td-agent/buffer/td use_ssl true flush_interval 10s </store> <store> type mongo database httpd collection accesslog host localhost port 27017 flush_interval 10s </store> <store> type forest subtype s3 <template> s3_bucket akari-ec2-log s3_endpoint s3-ap-northeast-1.amazonaws.com path ${tag}/ buffer_path /var/log/td-agent/buffer/${tag} time_slice_format %Y/%m/%d/ec2-%Y-%m-%d-%H flush_interval 1m </template> </store> </match>
tdコマンドでデータが登録されているか見てみましょう。
$ td table:list +----------+------------+------+-------+--------+--------------------------------+--------+ | Database | Table | Type | Count | Size | Last import | Schema | +----------+------------+------+-------+--------+--------------------------------+--------+ | fluent | info | log | 11 | 0.0 GB | Tue Jan 22 18:25:01 +0000 2013 | | | testdb | www_access | log | 5042 | 0.0 GB | Tue Jan 22 18:25:04 +0000 2013 | | +----------+------------+------+-------+--------+--------------------------------+--------+ 2 rows in set
確かにカウントが増えていますね。td-agentからTreasureDataにデータを送る際に注意する点ですが、ログに付けているタグ名からデータベース名やテーブル名を判別しています。td.データベース名.テーブル名になるようにタグ付けしてくださいね。リクエストとしてはタグ付けでインスタンスIDやIPアドレスを入れたいこともあるので、データベース名やテーブル名はタグ以外で指定できるほうがうれしいかな。ログがどのインスタンスで発生したのか何かしら知りたい時ってないかなぁ。仮想サーバーは使い捨てと思えば不要かなぁ。
まとめ
自前でHadoop環境作ってデータ解析するのは正直面倒だなぁと思っている方は、FluentdとTreasureDataを組み合わせてお気楽ビックデータ解析をしてみましょう。今回の収穫は、Hiveを使ってビックデータを簡単に問合せ・集計できたことでした。特定カラムの部分一致をカウントしてランキングする等、実務でも十分活用できそうです!今日から君もビックデータの魔法使いになろう!
参考資料
Continuous Data Import from Fluentd